在 PostgreSQL 16 及之前的版本中,数据库备份主要依赖于全量备份(Full Backup)+ WAL 日志的方案。这种传统备份模式虽然可靠,但存在诸多不便:
不过,以上“不便之处”是从网上 copy 的。在我工作实践中,全量备份 + WAL 日志备份方案的主要问题是:
PostgreSQL 17 直接官方引入了增量备份特性,从根本上解决了传统备份方案的诸多痛点。
但是我依然存在一个疑惑,增量备份虽然高效,但并非完美。它只备份了两次备份点之间发生变化的数据块,这意味着存在时间间隙风险:
如果最后一次增量备份已经完成,而下次备份时间还未到,在这个时间窗口发生故障,可能导致数据丢失。如何保持对这部分时间的数据变更的捕获呢?
所以最终还是要回到 WAL 日志上来。但是我可以只保留最后一次增量备份时间节点之后的 WAL 日志。
通过这个思路,我设计了一个全新的备份脚本,详细的脚本将会附在文章最后,这里先简单描述一下我的思路:
{YYYY}-{mm}-{dd}-{HHMMSS}_{TYPE}_{INDEX}
,实际备份文件夹名称类似 2025-01-23-050002_FULL_0
和 2025-01-24-223002_INCR_17
,这样能够清晰的判断备份的时间,每轮备份的次数,备份的类型,还可以通过文件夹名称进行排序。base.tar
pg_wal.tar
backup_manifest
是否存在。对 postgresql.conf
的以下项目进行修改
# 需要修改 wal 的级别
wal_level = replica
# 开启 WAL 归档
archive_mode = on
# 设置 WAL 归档目录
archive_command = 'copy %p D:\\********\\Achieve\\%f'
# 开启walsummarizer进程来记录WAL摘要信息,增量备份依赖这个参数
summarize_wal = on
# 会根据时间周期自动清理 WAL 摘要信息文件,需要保证一轮全量备份+增量备份保持在这个周期内。
wal_summary_keep_time = '10d'
然后重启 postgresql 服务
在脚本目录创建一个 .env
用来存储一些配置项:
# 备份父目录,用来存储备份的子文件夹
BACKUP_DIR = "D:/********/Backup"
# archive_command 中设定的 WAL 归档目录
ARCHIVE_DIR = "D:/********/Achieve"
# 日志文件位置
LOG_FILE = "D:/********/log.txt"
# pg_basebackup 执行文件的路径
PG_BASEBACKUP_PATH = "C:/Program Files/PostgreSQL/17/bin/pg_basebackup.exe"
# 数据库连接用户
PGUSER = "postgres"
# 数据库连接密码
PGPASSWORD = "************"
# 最大增量备份数,比如设定为 6 则每轮备份会有 1 次全量备份和 6 次增量备份,
# 如果每天运行一次脚本,那么完整的备份周期是 7 天一轮,每轮 7 个文件
MAX_INCREMENT = "6"
# 最大备份轮数,设为 2 时,如果超过两轮备份,更老的备份会被删除
MAX_ROLL = "2"
# 钉钉 webhook url
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=******"
# 钉钉 webhook 通知需要 at 的人的电话号码
DINGTALK_AT_MOBILE = "137******87"
在脚本目录创建脚本:
import logging
import os
import shutil
import subprocess
import time
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Literal
import httpx
from dotenv import load_dotenv
def init_logging(log_file):
"""
设置日志记录。
""" file_handler = RotatingFileHandler(log_file, maxBytes=200 * 1024, backupCount=3, encoding="utf-8")
console_handler = logging.StreamHandler()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[file_handler, console_handler]
)
def perform_incr_backup(pg_basebackup_path:Path, pguser, backup_dir:Path, last_backup_dir:Path|None, backup_type:Literal['FULL', 'INCR']):
if last_backup_dir.exists() and backup_type == 'INCR':
cmd = [str(pg_basebackup_path), '-Ft', '-D', str(backup_dir), '-v', '-i', str(last_backup_dir / 'backup_manifest'), '-U', pguser]
else:
cmd = [str(pg_basebackup_path), '-Ft', '-D', str(backup_dir), '-P', '-U', pguser]
try:
_r = subprocess.run(cmd, env=os.environ.copy(), capture_output=True, text=True, check=True)
return True, _r
except subprocess.CalledProcessError as _e:
return False, _e
def check_success(backup_dir:Path):
if (
backup_dir.is_dir() and
(backup_dir / 'base.tar').is_file() and
(backup_dir / 'pg_wal.tar').is_file() and
(backup_dir / 'backup_manifest').is_file()
):
return True
else:
return False
def get_latest_backup(backup_dir:Path, max_increment):
_sub_dirs = [item for item in backup_dir.iterdir() if item.is_dir()]
# 获取上一次备份的索引
if len(_sub_dirs) == 0:
_latest_dir = None
_latest_index = None
else:
_latest_dir = sorted(_sub_dirs, key=lambda x: x.name)[-1]
_latest_index = int(_latest_dir.name.split("_")[-1])
# 上一次的索引小于最大增量备份数时,进行增量备份,否则进行全量备份
if _latest_index is not None and _latest_index < max_increment:
backup_type:Literal['FULL', 'INCR'] = "INCR"
index = _latest_index + 1
else:
backup_type:Literal['FULL', 'INCR'] = "FULL"
index = 0
return _latest_dir, index, backup_type
def clean_wal(wal_archive_dir:Path):
# 获取 WAL 归档目录中,后缀名为 .backup 的文件
wal_files = [f for f in wal_archive_dir.glob('*.backup') if f.is_file()]
if len(wal_files) >= 1:
sorted_backup_files = sorted(wal_files, key=os.path.getctime, reverse=True)
reference_wal_file = sorted_backup_files[1]
reference_wal_time = os.path.getctime(reference_wal_file)
reference_wal_time_desc = datetime.fromtimestamp(reference_wal_time).strftime('%Y-%m-%d %H:%M:%S')
logging.info(f" 计划清除 {reference_wal_file}({reference_wal_time_desc}) 之前的 WAL 文件")
all_wal_files = [f for f in wal_archive_dir.iterdir() if f.is_file() and os.path.getctime(f) < reference_wal_time]
del_count, err_count, all_count = 0, 0, len(all_wal_files)
for file in all_wal_files:
try:
os.remove(file)
del_count += 1
logging.debug(f" 已删除归档的 WAL 文件: {file}")
except Exception as _e:
logging.error(f" 在删除归档 WAL 文件: {file} 时出错: {_e}")
err_count += 1
logging.info(f" 共计 {del_count}/{all_count} 个 WAL 文件归档清理完毕,错误 {err_count} 个。")
return del_count, err_count, all_count
else:
logging.info(f" 无需清理归档的 WAL 文件")
return 0, 0, 0
def clean_backup(max_roll:int, backup_dir:Path):
full_backups = [
folder for folder in backup_dir.iterdir()
if folder.is_dir() and 'FULL' in folder.name
]
full_backups.sort(key=lambda x: x.name)
logging.info(f" 计划保留 {max_roll} 轮备份,当前存在 {len(full_backups)} 轮备份")
if len(full_backups) > max_roll:
cutoff_backup = full_backups[- max_roll]
backups_to_remove = [
folder for folder in backup_dir.iterdir()
if folder.is_dir() and
folder.stat().st_ctime < cutoff_backup.stat().st_ctime
]
logging.info(f" 正在清除 {cutoff_backup} 之前创建的所有备份")
del_count, err_count, all_count = 0, 0, len(backups_to_remove)
for backup in backups_to_remove:
try:
shutil.rmtree(backup)
logging.debug(f" 已删除备份 {backup}")
del_count += 1
except Exception as _e:
logging.error(f" 在删除备份 {backup} 时出错: {_e}")
err_count += 1
logging.info(f" 共计 {del_count}/{all_count} 个备份清理完毕,错误 {err_count} 个。")
return del_count, err_count, all_count
else:
logging.info(f" 无需清理备份文件")
return 0, 0, 0
def dingtalk_alert(webhook_url:str, at_mobiles:str, title:str, text:str):
httpx.post(webhook_url, json={
"msgtype": "markdown",
"markdown": {"title":title, "text": text},
"at": {"atMobiles": [at_mobiles], "isAtAll": False}
})
def count_backup(backup_dir:Path, now_index:int):
full_backups = [
folder for folder in backup_dir.iterdir()
if folder.is_dir() and 'FULL' in folder.name
]
full_backups.sort(key=lambda x: x.stat().st_ctime)
this_roll_full_backup = full_backups[-1]
later_backups = [
folder for folder in backup_dir.iterdir()
if folder.is_dir() and folder.stat().st_ctime > this_roll_full_backup.stat().st_ctime
]
later_backups_count = len(later_backups)
check = (later_backups_count == now_index and all('INCR' in folder.name for folder in later_backups))
incr_backups = [
folder for folder in backup_dir.iterdir()
if folder.is_dir() and 'INCR' in folder.name
]
files = [f for f in backup_dir.rglob('*') if f.is_file()]
total_size = sum(f.stat().st_size for f in files)
total_gb = round(total_size / (1024 * 1024 * 1024), 2)
return len(full_backups), len(incr_backups), total_gb, check
if __name__ == "__main__":
# 加载 .env 文件
load_dotenv()
# 从环境变量读取配置
BACKUP_DIR = Path(os.getenv('BACKUP_DIR'))
ARCHIVE_DIR = Path(os.getenv('ARCHIVE_DIR'))
PG_BASEBACKUP_PATH = Path(os.getenv('PG_BASEBACKUP_PATH'))
PGUSER = os.getenv('PGUSER')
PGPASSWORD = os.getenv('PGPASSWORD')
LOG_FILE = os.getenv('LOG_FILE')
MAX_INCREMENT = int(os.getenv('MAX_INCREMENT'))
MAX_ROLL = int(os.getenv('MAX_ROLL'))
DINGTALK_WEBHOOK = os.getenv('DINGTALK_WEBHOOK')
DINGTALK_AT_MOBILE = os.getenv('DINGTALK_AT_MOBILE')
# 初始化日志记录器
init_logging(LOG_FILE)
logging.info(">>>>> BACKUP START")
t = time.time()
# 获取当前最新的备份
latest_backup, next_index, next_backup_type = get_latest_backup(BACKUP_DIR, MAX_INCREMENT)
logging.info(f"STEP-1 查询到上一次的备份是:{latest_backup}")
logging.info(f"STEP-2 正在准备进行 {next_backup_type} 备份,为该轮备份的第 {next_index}/{MAX_INCREMENT} 次备份。")
# 创建新备份的目录
new_backup:Path = BACKUP_DIR / datetime.now().strftime(f"%Y-%m-%d-%H%M%S_{next_backup_type}_{next_index}")
new_backup.mkdir(parents=True, exist_ok=True)
logging.info(f" 为新备份创建了目录:{new_backup}")
logging.info(f" 调用 {PG_BASEBACKUP_PATH} 进行备份中...")
logging.info(f" 使用 {PGUSER} 用户...")
is_success, result = perform_incr_backup(PG_BASEBACKUP_PATH, PGUSER, new_backup, Path(latest_backup), next_backup_type)
during_time = round((time.time() - t) / 60, 2)
# 如果备份成功
if is_success and check_success(new_backup):
logging.info(f" 备份成功: {new_backup}")
logging.info(f"STEP-3 准备清理 {ARCHIVE_DIR} 中归档的 WAL 文件")
wal_clean_del_count, wal_clean_err_count, wal_clean_all_count = clean_wal(ARCHIVE_DIR)
logging.info(f"STEP-4 准备清理 {BACKUP_DIR} 中的过期备份")
backup_clean_del_count, backup_clean_err_count, backup_clean_all_count = clean_backup(max_roll=MAX_ROLL, backup_dir=BACKUP_DIR)
alert = "🟢 数据库备份成功"
else:
shutil.rmtree(new_backup)
logging.error(f" 增量备份失败,已移除目录,失败原因: {result}")
wal_clean_del_count, wal_clean_err_count, wal_clean_all_count = 0, 0, 0
backup_clean_del_count, backup_clean_err_count, backup_clean_all_count = 0, 0, 0
alert = "🔴 数据库备份失败"
full_backups_count, incr_backups_count, backup_size, this_roll_check = count_backup(BACKUP_DIR, next_index)
alert_markdown = (
f"# {alert} \n "
f"### 本次备份 \n "
f"- 类型:{next_backup_type} \n "
f"- 轮次:{next_index}/{MAX_INCREMENT} \n "
f"- 文件:{new_backup} \n "
f"- 状态:{alert[-2:]} \n "
f"- 耗时:{during_time} 分钟 \n "
f"### 清理情况 \n "
f"- WAL清理:成功 {wal_clean_del_count}/{wal_clean_all_count} 错误 {wal_clean_err_count} \n "
f"- 备份清理:成功 {backup_clean_del_count}/{backup_clean_all_count} 错误 {backup_clean_err_count} \n "
f"### 备份状态 \n "
f"- 全量备份总数:{full_backups_count} \n "
f"- 增量备份总数:{incr_backups_count} \n "
f"- 备份数据总量:{backup_size}GB \n "
f"- 本轮备份完整性:{this_roll_check} \n "
f"--- \n "
f"{datetime.now().strftime('%Y/%m/%d %H:%M:%S')}"
)
dingtalk_alert(DINGTALK_WEBHOOK, DINGTALK_AT_MOBILE, alert[-7], alert_markdown)
logging.info(">>>>> BACKUP END\n\n")
因为我是用的 windows , 所以直接在任务计划程序中设定触发器,每天凌晨 4 点 15 分进行一次备份。
2025-01-25 13:47:28,363 - INFO - >>>>> BACKUP START
2025-01-25 13:47:28,364 - INFO - STEP-1 查询到上一次的备份是:D:\********\Backup\2025-01-25-133906_FULL_0
2025-01-25 13:47:28,364 - INFO - STEP-2 正在准备进行 FULL 备份,为该轮备份的第 0/24 次备份。
2025-01-25 13:47:28,364 - INFO - 为新备份创建了目录:D:\********\Backup\2025-01-25-134728_FULL_0
2025-01-25 13:47:28,364 - INFO - 调用 C:\Program Files\PostgreSQL\17\bin\pg_basebackup.exe 进行备份中...
2025-01-25 13:47:28,364 - INFO - 使用 postgres 用户...
2025-01-25 13:48:18,572 - INFO - 备份成功: D:\********\Backup\2025-01-25-134728_FULL_0
2025-01-25 13:48:18,572 - INFO - STEP-3 准备清理 D:\********\Achieve 中归档的 WAL 文件
2025-01-25 13:48:18,573 - INFO - 计划清除 D:\********\Achieve\000000010000006F000000A7.00000490.backup(2025-01-25 13:40:07) 之前的 WAL 文件
2025-01-25 13:48:18,573 - INFO - 共计 3/3 个 WAL 文件归档清理完毕,错误 0 个。
2025-01-25 13:48:18,573 - INFO - STEP-4 准备清理 D:\PostgreSQL_Backup\Backup 中的过期备份
2025-01-25 13:48:18,574 - INFO - 计划保留 2 轮备份,当前存在 2 轮备份
2025-01-25 13:48:18,574 - INFO - 无需清理备份文件
2025-01-25 13:48:19,541 - INFO - HTTP Request: POST https://oapi.dingtalk.com/robot/send?access_token=****** "HTTP/1.1 200 OK"
2025-01-25 13:48:19,544 - INFO - >>>>> BACKUP END
当然,也少不了通知推送:
备份机制依然不够完善
我目前是将其存储到了其他磁盘下,然后通过一个备份软件将其备份到云和专门的备份硬盘。虽然比较安全,但是增加了额外的软件依赖。
我希望能够在一轮备份完成后,对整轮备份进行合并,然后归档到 s3 存储。
备份验证机制不完善
目前只是验证了文件的存在,没有对其真实恢复能力进行自动化的检验。
这一点,需要构建其他的工作流管道,比如从 s3 收到新的归档后,在云端创建服务器资源进行恢复验证。
尚未构建恢复脚本
目前如果真的需要恢复,还是需要人工合并备份,到服务器上执行命令。尚未构建标准化的恢复流程。